{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Verify Spark Context object\n",
    "Check whether spark context is available. If there is an exception, you have to troubleshoot. Otherwise, good to go ahead."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.context.SparkContext at 0x7fe705e26828>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build a rdd from a python collection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "distData = sc.parallelize([1, 2, 3, 4])\n",
    "distData"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1, 2, 3, 4]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "distData.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Basic Statistics using RDD of numbers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "from random import random"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0.09126739620741864,\n",
       " 0.6088649788783818,\n",
       " 0.14085507465130842,\n",
       " 0.036607534023739174,\n",
       " 0.8389527552169032,\n",
       " 0.6471827609841071,\n",
       " 0.5432516459707021,\n",
       " 0.24845442208483226,\n",
       " 0.7483825380122531,\n",
       " 0.30687619311500813]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "l = [random() for _ in range(10)]\n",
    "l"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "rdd = sc.parallelize(l, numSlices=4) # no of partitions = 4"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(0.8389527552169032, 4.210695299144654)"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.max(), rdd.sum()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(count: 10, mean: 0.42106952991446545, stdev: 0.27586585452, max: 0.838952755217, min: 0.0366075340237)"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.stats()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 1:\n",
    "\n",
    "1. Create a directory in HDFS called movielens and load movies.csv and ratings.csv from ~/Downloads/datasets/movie-lens directory.  \n",
    "2. Load movies.csv into in moviesRdd\n",
    "3. Load ratings.csv into ratingsRdd\n",
    "4. Find out 10 top movies based on the highest average rating. Consider only those movies that have got at least 100 ratings.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total 3424\r\n",
      "-rw-r--r-- 1 cloudera cloudera  207997 Jul  2 20:49 links.csv\r\n",
      "-rw-r--r-- 1 cloudera cloudera  515700 Jul  2 20:49 movies.csv\r\n",
      "-rw-r--r-- 1 cloudera cloudera 2580392 Jul  2 20:49 ratings.csv\r\n",
      "-rw-r--r-- 1 cloudera cloudera  199073 Jul  2 20:49 tags.csv\r\n"
     ]
    }
   ],
   "source": [
    "!ls -l ~/Downloads/datasets/movie-lens"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "10330"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "movies = sc.textFile(\"/user/cloudera/movielens/movies\")\n",
    "movies.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "movieId,title,genres\n",
      "1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy\n",
      "2,Jumanji (1995),Adventure|Children|Fantasy\n",
      "3,Grumpier Old Men (1995),Comedy|Romance\n",
      "4,Waiting to Exhale (1995),Comedy|Drama|Romance\n",
      "5,Father of the Bride Part II (1995),Comedy\n",
      "6,Heat (1995),Action|Crime|Thriller\n",
      "7,Sabrina (1995),Comedy|Romance\n",
      "8,Tom and Huck (1995),Adventure|Children\n",
      "9,Sudden Death (1995),Action\n"
     ]
    }
   ],
   "source": [
    "for r in movies.take(10):\n",
    "    print(r)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy\n",
      "2,Jumanji (1995),Adventure|Children|Fantasy\n",
      "3,Grumpier Old Men (1995),Comedy|Romance\n",
      "4,Waiting to Exhale (1995),Comedy|Drama|Romance\n",
      "5,Father of the Bride Part II (1995),Comedy\n",
      "6,Heat (1995),Action|Crime|Thriller\n",
      "7,Sabrina (1995),Comedy|Romance\n",
      "8,Tom and Huck (1995),Adventure|Children\n",
      "9,Sudden Death (1995),Action\n",
      "10,GoldenEye (1995),Action|Adventure|Thriller\n"
     ]
    }
   ],
   "source": [
    "moviesData = movies.filter(lambda line: not line.startswith(\"movieId\"))\n",
    "for r in moviesData.take(10):\n",
    "    print(r)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "105340"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ratings = sc.textFile(\"/user/cloudera/movielens/ratings\")\n",
    "ratings.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "userId,movieId,rating,timestamp\n",
      "1,16,4.0,1217897793\n",
      "1,24,1.5,1217895807\n",
      "1,32,4.0,1217896246\n",
      "1,47,4.0,1217896556\n",
      "1,50,4.0,1217896523\n",
      "1,110,4.0,1217896150\n",
      "1,150,3.0,1217895940\n",
      "1,161,4.0,1217897864\n",
      "1,165,3.0,1217897135\n"
     ]
    }
   ],
   "source": [
    "for r in ratings.take(10):\n",
    "    print(r)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1,16,4.0,1217897793\n",
      "1,24,1.5,1217895807\n",
      "1,32,4.0,1217896246\n",
      "1,47,4.0,1217896556\n",
      "1,50,4.0,1217896523\n",
      "1,110,4.0,1217896150\n",
      "1,150,3.0,1217895940\n",
      "1,161,4.0,1217897864\n",
      "1,165,3.0,1217897135\n",
      "1,204,0.5,1217895786\n"
     ]
    }
   ],
   "source": [
    "ratingsData = ratings.filter(lambda line: not line.startswith(\"userId\"))\n",
    "for r in ratingsData.take(10):\n",
    "    print(r)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'movieId,title,genres'"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "movies.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'userId,movieId,rating,timestamp'"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ratings.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(1, 'Toy Story (1995)')"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "movies_by_movieid = moviesData\\\n",
    ".map(lambda line: line.split(\",\"))\\\n",
    ".map(lambda tokens: (int(tokens[0]), tokens[1]))\n",
    "\n",
    "movies_by_movieid.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(16, 4.0)"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ratings_by_movieid = ratingsData\\\n",
    ".map(lambda line: line.split(\",\"))\\\n",
    ".map(lambda tokens: (int(tokens[1]), float(tokens[2])))\n",
    "\n",
    "ratings_by_movieid.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "((318, '\"Shawshank Redemption'), (4.454545454545454, 308))\n",
      "((858, '\"Godfather'), (4.392857142857143, 210))\n",
      "((50, '\"Usual Suspects'), (4.328947368421052, 228))\n",
      "((1136, 'Monty Python and the Holy Grail (1975)'), (4.3019480519480515, 154))\n",
      "((527, \"Schindler's List (1993)\"), (4.296370967741935, 248))\n",
      "((1193, \"One Flew Over the Cuckoo's Nest (1975)\"), (4.2727272727272725, 143))\n",
      "((608, 'Fargo (1996)'), (4.2711442786069655, 201))\n",
      "((2571, '\"Matrix'), (4.264367816091954, 261))\n",
      "((1221, '\"Godfather: Part II'), (4.260714285714286, 140))\n",
      "((1213, 'Goodfellas (1990)'), (4.2592592592592595, 135))\n"
     ]
    }
   ],
   "source": [
    "top_10 = movies_by_movieid.join(ratings_by_movieid)\\\n",
    ".map(lambda p: ((p[0], p[1][0]), p[1][1]))\\\n",
    ".groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\\\n",
    ".filter(lambda p: p[1][1] >= 100)\\\n",
    ".sortBy(lambda p: p[1], False)\\\n",
    ".take(10)\n",
    "\n",
    "for m in top_10:\n",
    "    print(m)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(4096, ('\"Curse', 4.0))"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "movies_by_movieid.join(ratings_by_movieid)\\\n",
    ".first()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "((4096, '\"Curse'), 4.0)"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "movies_by_movieid.join(ratings_by_movieid)\\\n",
    ".map(lambda p: ((p[0], p[1][0]), p[1][1]))\\\n",
    ".first()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "((1036, 'Die Hard (1988)'), (3.918181818181818, 165))"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "top_10 = movies_by_movieid.join(ratings_by_movieid)\\\n",
    ".map(lambda p: ((p[0], p[1][0]), p[1][1]))\\\n",
    ".groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\\\n",
    ".filter(lambda p: p[1][1] >= 100)\n",
    "\n",
    "top_10.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "((318, '\"Shawshank Redemption'), (4.454545454545454, 308))\n",
      "((858, '\"Godfather'), (4.392857142857143, 210))\n",
      "((50, '\"Usual Suspects'), (4.328947368421052, 228))\n",
      "((1136, 'Monty Python and the Holy Grail (1975)'), (4.3019480519480515, 154))\n",
      "((527, \"Schindler's List (1993)\"), (4.296370967741935, 248))\n",
      "((1193, \"One Flew Over the Cuckoo's Nest (1975)\"), (4.2727272727272725, 143))\n",
      "((608, 'Fargo (1996)'), (4.2711442786069655, 201))\n",
      "((2571, '\"Matrix'), (4.264367816091954, 261))\n",
      "((1221, '\"Godfather: Part II'), (4.260714285714286, 140))\n",
      "((1213, 'Goodfellas (1990)'), (4.2592592592592595, 135))\n"
     ]
    }
   ],
   "source": [
    "top_10 = movies_by_movieid.join(ratings_by_movieid)\\\n",
    ".map(lambda p: ((p[0], p[1][0]), p[1][1]))\\\n",
    ".groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\\\n",
    ".filter(lambda p: p[1][1] >= 100)\\\n",
    ".sortBy(lambda p: p[1], False)\\\n",
    "\n",
    "for m in top_10.take(10):\n",
    "    print(m)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercise 2: use stocks.csv file for the this exercise\n",
    " \n",
    "- Load the stocks.csv file in a new Rdd \n",
    "- Find stocks records in 2016 \n",
    "- Find stocks top 10 records based on the trading volume in 2016 \n",
    "- Find average volume per stock traded in 2016 \n",
    "- Find top 10 stocks based on highest trading volume in 2016  \n",
    " "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Load the stocks.csv file in a new Rdd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "stocks = sc.textFile(\"stocks\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1857093"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'date,open,high,low,close,volume,adjclose,symbol'"
      ]
     },
     "execution_count": 27,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "stocks MapPartitionsRDD[84] at textFile at NativeMethodAccessorImpl.java:0"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks.cache()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Find stocks records in 2016"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "78041"
      ]
     },
     "execution_count": 29,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks2016 = stocks.filter(lambda line: line.startswith(\"2016\"))\n",
    "stocks2016.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Find stocks top 10 records based on the trading volume in 2016."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'2016-01-04,46.119999,46.130001,45.360001,45.799999,3472200.0,44.870315,XLNX'"
      ]
     },
     "execution_count": 30,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks2016.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['2016-01-26,61.740002,62.509998,61.509998,62.349998,999900.0,61.430663,AWK',\n",
       " '2016-08-03,75.599998,76.099998,75.559998,76.089996,999900.0,75.520442,KLAC',\n",
       " '2016-05-23,12.48,12.56,12.31,12.48,9997600.0,12.307377,KEY',\n",
       " '2016-02-22,40.139999,40.93,40.07,40.84,9997300.0,40.60075,AAL',\n",
       " '2016-07-12,38.470001,38.91,38.240002,38.790001,9997200.0,38.790001,PYPL',\n",
       " '2016-04-11,104.059998,104.059998,102.300003,102.620003,999700.0,102.350541,INTU',\n",
       " '2016-02-01,48.939999,49.990002,48.790001,49.900002,9996700.0,48.283396,SO',\n",
       " '2016-06-24,41.869999,42.73,40.630001,40.970001,9996400.0,40.970001,VIAB',\n",
       " '2016-01-04,117.25,117.730003,115.870003,117.580002,9995000.0,115.844428,MCD',\n",
       " '2016-02-23,131.289993,131.910004,130.910004,131.529999,999500.0,129.687866,KMB']"
      ]
     },
     "execution_count": 31,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks2016.sortBy(lambda line: line.split(\",\")[5], False).take(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Find average volume per stock traded in 2016"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "('XLNX', 3472200.0)"
      ]
     },
     "execution_count": 32,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks2016.map(lambda line: line.split(\",\"))\\\n",
    ".map(lambda tokens: (tokens[7], float(tokens[5])))\\\n",
    ".first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "('EW', 1808258.9743589743)"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks2016.map(lambda line: line.split(\",\"))\\\n",
    ".map(lambda tokens: (tokens[7], float(tokens[5])))\\\n",
    ".groupByKey().mapValues(lambda volumes: sum(volumes)/ len(volumes))\\\n",
    ".first()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Find top 10 stocks based on highest trading volume in 2016"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('BAC', 109953689.74358974)\n",
      "('FCX', 47979558.333333336)\n",
      "('CHK', 41622735.256410256)\n",
      "('AAPL', 40944183.974358976)\n",
      "('GE', 37751663.461538464)\n",
      "('F', 37432197.43589743)\n",
      "('PFE', 35777183.974358976)\n",
      "('MSFT', 34194448.07692308)\n",
      "('FB', 28902566.025641024)\n",
      "('MU', 27260807.692307692)\n"
     ]
    }
   ],
   "source": [
    "top10 = stocks2016.map(lambda line: line.split(\",\"))\\\n",
    ".map(lambda tokens: (tokens[7], float(tokens[5])))\\\n",
    ".groupByKey().mapValues(lambda volumes: sum(volumes)/ len(volumes))\\\n",
    ".sortBy(lambda p: p[1], False)\\\n",
    ".take(10)\n",
    "\n",
    "for r in top10:\n",
    "    print(r)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'2000-07-17,95.4375,97.5,92.75,96.625,3508100.0,74.269199,XLNX'"
      ]
     },
     "execution_count": 35,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocksData = stocks.filter(lambda line: not line.startswith(\"date\"))\n",
    "stocksData.first()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parse the date field as datetime object"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from datetime import datetime"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [],
   "source": [
    "d = datetime.strptime(\"2016-07-17\", \"%Y-%m-%d\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "datetime.datetime"
      ]
     },
     "execution_count": 38,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(d)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "2016"
      ]
     },
     "execution_count": 39,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "d.year"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'2016-01-04,46.119999,46.130001,45.360001,45.799999,3472200.0,44.870315,XLNX'"
      ]
     },
     "execution_count": 40,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocksData.filter(lambda line: datetime.strptime(line.split(\",\")[0],\"%Y-%m-%d\").year == 2016).first()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Call foreachPartition or mapPartition to operate over entire partition"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['0.6688504710050863', '0.5298388694075957', '0.04317059809978663', '0.43160772581803797', '0.13492859253903766', '0.3782786241535713', '0.20573063094285304', '0.563796213609239', '0.716333149130251', '0.07209716251966614']\n"
     ]
    }
   ],
   "source": [
    "def save_to_db(c):\n",
    "    \"\"\"\n",
    "    More practitical use cases: \n",
    "        A. open database connection from each partition and save the record in bulk\n",
    "        B. make web service calls and send multiple records, if web service calls support that\n",
    "    \n",
    "    Steps: \n",
    "        1. open db connection or web service connection\n",
    "        2. send all c in bulk operation \n",
    "        3. close the connection\n",
    "    \"\"\"\n",
    "    print(\" \".join(c))\n",
    "\n",
    "rdd = sc.parallelize([str(random()) for _ in range(10)], 3)\n",
    "print(rdd.collect())\n",
    "rdd.foreachPartition(save_to_db)\n",
    "# Check jupyter launching console to view the numbers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Collect each partition "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['A', 'B', 'C', 'E', 'F', 'G', 'H']"
      ]
     },
     "execution_count": 42,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd = sc.parallelize(list(\"ABCEFGH\"), 3)\n",
    "rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[['A', 'B'], ['C', 'E'], ['F', 'G', 'H']]"
      ]
     },
     "execution_count": 43,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.glom().collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Save stocks records in multiple files - one for each symbol"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'FTR': 190, 'EW': 164, 'M': 284, 'DFS': 122, 'ALB': 26, 'SIG': 401, 'WMT': 483, 'CBS': 78, 'D': 118, 'LNC': 276, 'GT': 205, 'NTRS': 333, 'SWKS': 419, 'TGNA': 430, 'SYMC': 423, 'PH': 359, 'HOT': 221, 'AVB': 46, 'AFL': 19, 'EFX': 147, 'BLL': 66, 'PFG': 356, 'HCP': 214, 'LMT': 275, 'HD': 215, 'CMA': 92, 'MYL': 317, 'CMI': 96, 'MPC': 308, 'UA.C': 448, 'NEM': 322, 'DNB': 133, 'DG': 123, 'FAST': 170, 'YUM': 498, 'GIS': 195, 'MMM': 302, 'LB': 266, 'EIX': 148, 'URI': 458, 'FOX': 185, 'GE': 192, 'TDG': 428, 'UAL': 449, 'SYK': 422, 'ESS': 160, 'AIZ': 23, 'HSIC': 228, 'AET': 18, 'COH': 102, 'AGN': 20, 'CPB': 106, 'GPN': 201, 'WHR': 479, 'GM': 197, 'CAG': 73, 'CI': 88, 'TEL': 429, 'EA': 143, 'WDC': 475, 'GOOGL': 199, 'TIF': 432, 'KEY': 253, 'XRX': 495, 'FBHS': 172, 'UHS': 451, 'ES': 158, 'AMZN': 37, 'DISCK': 129, 'IRM': 241, 'PCAR': 349, 'HBI': 211, 'BHI': 62, 'HPE': 223, 'NWL': 336, 'MRK': 309, 'WMB': 482, 'FTI': 189, 'CME': 94, 'CB': 76, 'FLS': 183, 'ALXN': 30, 'HRL': 226, 'PEG': 353, 'ADI': 10, 'CAH': 74, 'DTE': 139, 'FSLR': 188, 'ORCL': 343, 'MOS': 307, 'NVDA': 335, 'TXN': 444, 'RCL': 382, 'SPGI': 409, 'WFM': 478, 'KHC': 254, 'JEC': 247, 'DGX': 124, 'KIM': 255, 'OKE': 341, 'ACN': 8, 'WYN': 487, 'CNP': 99, 'TXT': 445, 'STZ': 417, 'CMCSA': 93, 'LUV': 281, 'CCL': 80, 'NFX': 324, 'EQT': 157, 'VRSN': 469, 'AWK': 49, 'MCD': 290, 'RRC': 392, 'LLTC': 272, 'GOOG': 198, 'XEC': 489, 'AKAM': 25, 'NBL': 319, 'CSCO': 108, 'AZO': 52, 'NTAP': 332, 'CAT': 75, 'CSX': 110, 'EMR': 152, 'KO': 260, 'APC': 42, 'URBN': 457, 'PYPL': 377, 'AEE': 15, 'PGR': 358, 'MAC': 286, 'MKC': 299, 'ETFC': 161, 'IP': 238, 'K': 252, 'MRO': 310, 'CFG': 84, 'FITB': 179, 'AMGN': 34, 'HSY': 230, 'VNO': 467, 'EL': 149, 'XYL': 496, 'IPG': 239, 'RHT': 386, 'AME': 32, 'CBG': 77, 'GPC': 200, 'HAL': 207, 'DD': 120, 'DLPH': 130, 'PCG': 350, 'TWX': 443, 'COG': 101, 'AA': 1, 'MSFT': 312, 'SBUX': 395, 'GPS': 202, 'SHW': 400, 'NEE': 321, 'APD': 43, 'ROST': 391, 'AMT': 36, 'ANTM': 39, 'FCX': 173, 'NLSN': 327, 'PSA': 371, 'BBBY': 56, 'MU': 315, 'MAS': 288, 'MJN': 298, 'SRE': 412, 'LEN': 268, 'BK': 64, 'DE': 121, 'HOLX': 219, 'TSS': 442, 'DVA': 141, 'JPM': 250, 'ROK': 389, 'PAYX': 346, 'DPS': 137, 'BIIB': 63, 'SO': 407, 'MAT': 289, 'MHK': 297, 'CHK': 86, 'CINF': 89, 'SNA': 405, 'SCHW': 397, 'EBAY': 144, 'CMG': 95, 'PNW': 366, 'SYY': 424, 'SEE': 399, 'ABC': 6, 'DVN': 142, 'DRI': 138, 'AYI': 51, 'COF': 100, 'JNPR': 249, 'ALLE': 29, 'KMX': 259, 'LH': 269, 'ADM': 11, 'RIG': 387, 'OMC': 342, 'APH': 44, 'OXY': 345, 'CNC': 98, 'NOV': 329, 'WBA': 474, 'ADP': 12, 'HCN': 213, 'NWS': 337, 'CF': 83, 'WY': 486, 'VFC': 463, 'KLAC': 256, 'UNM': 454, 'MCK': 292, 'HPQ': 224, 'RTN': 394, 'HIG': 217, 'SWK': 418, 'KMB': 257, 'SE': 398, 'DO': 134, 'MA': 285, 'SWN': 420, 'ETR': 163, 'HST': 229, 'MON': 306, 'JBHT': 245, 'MCO': 293, 'PKI': 361, 'MO': 305, 'DUK': 140, 'PPG': 367, 'XL': 491, 'JWN': 251, 'LRCX': 279, 'PM': 363, 'ADS': 13, 'FRT': 187, 'AMG': 33, 'C': 71, 'CMS': 97, 'ALL': 28, 'F': 169, 'RHI': 385, 'AEP': 16, 'MMC': 301, 'FLR': 182, 'UNH': 453, 'CLX': 91, 'CTXS': 114, 'EXPE': 167, 'COST': 105, 'CHD': 85, 'BSX': 68, 'PG': 357, 'BBT': 57, 'XLNX': 492, 'TMK': 434, 'CVS': 115, 'LNT': 277, 'PX': 375, 'DOV': 135, 'LLY': 273, 'ZBH': 499, 'EXC': 165, 'AXP': 50, 'MNST': 304, 'BBY': 58, 'YHOO': 497, 'A': 0, 'REGN': 383, 'CRM': 107, 'RL': 388, 'KSS': 263, 'HES': 216, 'NKE': 326, 'FISV': 178, 'CCI': 79, 'HUM': 231, 'WLTW': 480, 'ISRG': 242, 'BA': 53, 'PHM': 360, 'SJM': 402, 'R': 380, 'ESRX': 159, 'BEN': 61, 'PNR': 365, 'GS': 204, 'DHI': 125, 'SLB': 403, 'EXR': 168, 'VZ': 472, 'AMAT': 31, 'NFLX': 323, 'HAR': 208, 'TROW': 437, 'PCLN': 351, 'AAPL': 4, 'ATVI': 45, 'MNK': 303, 'KMI': 258, 'LLL': 271, 'ED': 146, 'INTU': 237, 'GLW': 196, 'PNC': 364, 'MDLZ': 294, 'FB': 171, 'IVZ': 244, 'GWW': 206, 'RAI': 381, 'TSN': 440, 'IR': 240, 'UDR': 450, 'VLO': 465, 'O': 339, 'VIAB': 464, 'AVGO': 47, 'EOG': 154, 'MUR': 316, 'HRS': 227, 'WAT': 473, 'ENDP': 153, 'PRU': 370, 'UTX': 460, 'DIS': 127, 'VRTX': 470, 'HBAN': 210, 'DOW': 136, 'CHRW': 87, 'COP': 104, 'LOW': 278, 'MS': 311, 'FLIR': 181, 'CA': 72, 'ABT': 7, 'LUK': 280, 'CSRA': 109, 'EMC': 150, 'QCOM': 378, 'ORLY': 344, 'RSG': 393, 'NSC': 331, 'SPLS': 410, 'TYC': 446, 'DISCA': 128, 'ULTA': 452, 'AAP': 3, 'NOC': 328, 'SPG': 408, 'SCG': 396, 'DLTR': 132, 'BXP': 70, 'FL': 180, 'WYNN': 488, 'CELG': 81, 'EXPD': 166, 'FE': 175, 'QRVO': 379, 'BWA': 69, 'TSCO': 439, 'NUE': 334, 'WM': 481, 'AVY': 48, 'FDX': 174, 'FMC': 184, 'T': 425, 'ILMN': 235, 'AES': 17, 'SRCL': 411, 'VTR': 471, 'AAL': 2, 'PBI': 348, 'DAL': 119, 'PVH': 373, 'STT': 415, 'TGT': 431, 'MSI': 313, 'BAX': 55, 'CVX': 116, 'HOG': 218, 'UPS': 456, 'PFE': 355, 'FOXA': 186, 'NRG': 330, 'NAVI': 318, 'APA': 41, 'XRAY': 494, 'MET': 296, 'CTAS': 111, 'LM': 274, 'STX': 416, 'STI': 413, 'LYB': 283, 'ABBV': 5, 'LKQ': 270, 'PXD': 376, 'NWSA': 338, 'JCI': 246, 'L': 265, 'CXO': 117, 'IBM': 232, 'FFIV': 176, 'BCR': 59, 'ADBE': 9, 'AMP': 35, 'GGP': 193, 'OI': 340, 'TJX': 433, 'RF': 384, 'PLD': 362, 'WFC': 477, 'EQR': 156, 'CL': 90, 'JNJ': 248, 'ECL': 145, 'V': 461, 'HON': 220, 'ETN': 162, 'ROP': 390, 'VRSK': 468, 'HCA': 212, 'TSO': 441, 'TRIP': 436, 'PSX': 372, 'UNP': 455, 'KR': 262, 'DHR': 126, 'LVLT': 282, 'KSU': 264, 'MDT': 295, 'TRV': 438, 'TMO': 435, 'BAC': 54, 'ALK': 27, 'ITW': 243, 'INTC': 236, 'CTSH': 113, 'AJG': 24, 'BLK': 65, 'USB': 459, 'EMN': 151, 'MAR': 287, 'HP': 222, 'PPL': 368, 'VMC': 466, 'DLR': 131, 'SYF': 421, 'AN': 38, 'GILD': 194, 'WEC': 476, 'WU': 485, 'ADSK': 14, 'AIV': 22, 'XEL': 490, 'PBCT': 347, 'ZION': 500, 'BDX': 60, 'ICE': 233, 'HAS': 209, 'COL': 103, 'HRB': 225, 'symbol': 501, 'MLM': 300, 'PDCO': 352, 'GD': 191, 'PWR': 374, 'AIG': 21, 'TAP': 426, 'STJ': 414, 'VAR': 462, 'AON': 40, 'PRGO': 369, 'BMY': 67, 'SNI': 406, 'CERN': 82, 'EQIX': 155, 'TDC': 427, 'GRMN': 203, 'WRK': 484, 'LEG': 267, 'IFF': 234, 'NDAQ': 320, 'PEP': 354, 'SLG': 404, 'MCHP': 291, 'UA': 447, 'CTL': 112, 'XOM': 493, 'MTB': 314, 'NI': 325, 'FIS': 177, 'KORS': 261}\n"
     ]
    }
   ],
   "source": [
    "symbols = stocks.map(lambda l: l.split(\",\")[7]).distinct().collect() # Find disinct stock symbols\n",
    "symbols = sorted(symbols)  # Sort the symbols alphabatically \n",
    "\n",
    "# Add an index for each symbol and convert the collection to Map for lookup\n",
    "symbols = dict(list(zip(symbols, range(len(symbols)))))\n",
    "print(symbols)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "502"
      ]
     },
     "execution_count": 45,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(symbols)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "stocks_partitioned = (stocks\n",
    ".keyBy(lambda r: symbols.get(r.split(\",\")[7]))\n",
    ".partitionBy(len(symbols), lambda i: i)\n",
    ".map(lambda p: p[1], preservesPartitioning = True))\n",
    "\n",
    "stocks_partitioned.saveAsTextFile(\"stocks_by_symbol\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "partitioner: <pyspark.rdd.Partitioner object at 0x7fe70502b9b0> , num_partitions:  502\n"
     ]
    }
   ],
   "source": [
    "print(\"partitioner:\", stocks_partitioned.partitioner,\n",
    "      \", num_partitions: \", stocks_partitioned.partitioner.numPartitions)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.4.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
